iT邦幫忙

2023 iThome 鐵人賽

DAY 29
0
Software Development

Rust Web API 從零開始系列 第 29

Day29 - 附錄: Rust中的非同步程式設計(3)

  • 分享至 

  • xImage
  •  

昨天介紹了future的特徵,現在我們要由上而下的看一個非同步應用程式,今天要介紹的內容參考這裡,我們要看看runtime如何處理一個非同步程式問題。

async/await

假設有一段簡單的非同步的程式碼如下:

#[tokio::main]
async fn main(){
    task1();
    println!("----async_main-----");
    task2().await;
}

async fn task1() {
    println!("-------task1-------");
}

async fn task2() {
    println!("-------task2-------");
}

//// 結果:
//// ----async_main-----
//// -------task2-------

如果跟我一樣是C#的使用者,大概會覺得很納悶,task1()的輸出怎麼不見了?這是因為rust的非同步是延遲評估的,除非遇到await修飾詞,否則那個future物件不會作用。如果要看到task1()如期作用,程式碼要改成這樣:

#[tokio::main]
async fn main(){
    task1().await;
    println!("----async_main-----");
    task2().await;
}

//// 結果:
//// -------task1-------
//// ----async_main-----
//// -------task2-------

如此一來就可以看到main()如我們意料般的輸出。

非同步的狀態機樹

我們來分析一下main()這個非同步方法,內部可以分成幾個狀態:

  1. main()開始(MainStart)
  2. task1()運行(Task1Running)
  3. task2()運行(Task2Running)
  4. main()完成(MainComplete)

可以用一個enum來對方法內狀態建立模型:

enum MainState{
  MainStart,
  Task1Running,
  Task2Running,
  MainComplete
}

task1()task2()這兩個方法也有各自的內部狀態,當然也可以用enum來描述。那麼只要將這些enum透過某種方式連結起來,我是不是就得到了一個能夠描述main()內部所有狀態的模型呢?我們再來改一下上面的MainState

enum MainState{
  MainStart,
  Task1Running(Task1State),
  Task2Running(Task2State),
  MainComplete
}

沒錯,只要善用enum本身的特性,就可以得到一個描述main()中狀態的樹狀結構。
https://ithelp.ithome.com.tw/upload/images/20230926/20148594UJgpykZprF.png
但其實我們可以更進一步把MainState.Task1RuningMainState.Task2Runing的內部成員改成一個future物件:

enum MainState{
  MainStart,
  Task1Running(Task1Future),
  Task2Running(Task2Future),
  MainComplete
}

為什麼要這樣改呢?因為這樣的資料結構有利於runtime走訪應用程式的內部狀態。

實做Future Trait

既然我們要把async fn Main()作為一個future,那就來嘗試實做看看吧,首先定義一個結構體來表示main()

pub struct MainFuture
{
    state: MainState
}

為求便於說明,我們來實做之前用到的SimpleFuture:

impl SimpleFuture for MainFuture {
    type Output = ();

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        match self.state {
            //// 當MainFuture的狀態為MainStart時
            //// 要把狀態變更為下一步Task1Running
            //// 並且回傳Pending
            MainState::MainStart => {
                self.state = MainState::Task1Running(Task1Future::new());
                Poll::Pending
            },
            //// 當MainFuture的狀態為Task1Running時
            //// 須取出task1,並且詢問task1的狀態
            //// 如果task1尚未完成,代表main也還沒完成,所以不做任何事
            //// 如果task1已完成,要把狀態變更為下一步Task2Running
            //// 但main還沒做完,所以回傳Pending
            MainState::Task1Running(task1) => {
                if task1.poll(wake) == Poll::Pending {
                    Poll::Pending
                } else {
                    self.state = MainState::Task2Running(Task2Future::new());
                    Poll::Pending
                }
            },
            //// 當MainFuture的狀態為Task2Running時
            //// 須取出task2,並且詢問task2的狀態
            //// 如果task2尚未完成,代表main也還沒完成,所以不做任何事
            //// 如果task2已完成,要把狀態變更為下一步MainComplete
            //// 實際上Task2完成代表整個Main都完成了
            //// 所以回傳Ready
            MainState::Task2Running(task2) => {
                if task2.poll(wake) == Poll::Pending {
                    Poll::Pending
                } else {
                    self.state = MainState::MainComplete;
                    Poll::Ready(())
                }
            },
            MainState::Complete => Poll::Ready(())
        }
    }
}

上面的程式碼嘗試把main()以一個實做了SimpleFuture的結構來表示,而runtime做的事情就是調用poll方法來詢問main()的狀態。當poll()被調用的時候,MainFuture會依照內部的狀態決定是否要詢問子層future的狀態,並且將wake方法往下傳遞。runtime會在適當的時候調用poll來確認main()的狀態,而這個不斷詢問的過程就是在走訪由各非同步作業組成的狀態樹。

接下來我們節錄一小段tokioblock_on方法看看runtime到底在做什麼:

pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
    use std::task::Context;
    use std::task::Poll::Ready;

    // `get_unpark()` should not return a Result
    let waker = self.waker()?;
    let mut cx = Context::from_waker(&waker);

    pin!(f);

    loop {
        if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
            return Ok(v);
        }

        self.park();
    }
}

整個非同步的故事大概像下面這樣:
https://ithelp.ithome.com.tw/upload/images/20230925/20148594gaPOVvU7cA.png

block_on內會傳遞contex到狀態樹終端的task,這個task通常是應用程式與作業系統的IO邊界,像是發出HTTP請求或是資料庫抓取資料。當呼叫完poll()緊接著就是觸發park(),這個方法的作用是將任務掛住並讓出執行緒,於是迴圈在這邊會被暫時停止。接下來當系統發生IO事件時,比如說發出的HTTP請求收到回覆,這時候終端的任務就會透過waker通知runtime要恢復狀態,迴圈再次進行,因此再次調用了poll()詢訪狀態機樹,推進整個非同步程式的進行。

上面大概說明了rust中是如何思考非同步程式的,編譯器在看到async關鍵字時就會將方法編譯成一個隱含的future trait物件,await則是幫助編譯器判定要如何建立類似enum的資料結構來描述方法內部的狀態,某種程度上rust在編譯時期就會把非同步方法的執行順序排序好了。


上一篇
Day28 - 附錄: Rust中的非同步程式設計(2)
下一篇
Day30 - 附錄: Rust中的非同步程式設計(4)
系列文
Rust Web API 從零開始30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言